/**
 * Copyright (C), 2017-2018, XXX有限公司
 * FileName: NettyClient
 * Author:   zengjian
 * Date:     2018/10/8 17:16
 * Description: NettyClient
 * History:
 * <author>          <time>          <version>          <desc>
 * 作者姓名           修改时间           版本号              描述
 */
package org.yinxue.framework.rpc.tansport;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.MessageToMessageEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yinxue.framework.rpc.RpcRequest;
import org.yinxue.framework.rpc.RpcResponse;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * 〈NettyClient〉<br>
 * 〈一句话描述〉
 *
 * @author zengjian
 * @create 2018/10/8 17:16
 */
public class NettyClient implements Closeable {

    public static final Logger LOGGER = LoggerFactory.getLogger(NettyClient.class);

    private AtomicBoolean isConnected = new AtomicBoolean(false);
    private EventLoopGroup group = new NioEventLoopGroup(10);
    private Bootstrap bootstrap;
    private final ConcurrentHashMap<String, ArrayBlockingQueue<RpcResponse>> rpcResponseMap = new ConcurrentHashMap<>(1024);
    private Channel channel;


    public RpcResponse sendRpcRequest(final RpcRequest rpcRequest) throws InterruptedException, IOException {
        LOGGER.info("NettyClient发送请求:{}", rpcRequest);
        ArrayBlockingQueue<RpcResponse> queue = rpcResponseMap.putIfAbsent(rpcRequest.getRequestId(), new ArrayBlockingQueue<RpcResponse>(1));
        // 如果不为空，说明已经put过了，只需要等待返回结果即可
        if (queue != null) {
            return queue.take();
        }
        try {
            if (channel == null || !channel.isOpen()) {
                connect();
            }
            ChannelFuture future = channel.writeAndFlush(rpcRequest);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        LOGGER.info("NettyClient请求处理成功");
                    }
                }
            });
            RpcResponse rpcResponse = rpcResponseMap.get(rpcRequest.getRequestId()).take();
            LOGGER.info("NettyClient请求结果:{}", rpcResponse);
            if (rpcResponse!=null){
                rpcResponseMap.remove(rpcRequest.getRequestId());
            }
            return rpcResponse;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }

    public void connect(String host, int port) throws InterruptedException {
        bootstrap = new Bootstrap();
        bootstrap.group(group).channel(NioSocketChannel.class)
                .option(ChannelOption.TCP_NODELAY, true)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .handler(new ClientInitHandler());
        this.channel = bootstrap.connect(host, port).syncUninterruptibly().channel();
    }

    public void connect() throws InterruptedException {
        connect(NettyServer.DEFAULT_HOST, NettyServer.DEFAULT_PORT);
    }

    @Override
    public void close() throws IOException {
        isConnected.compareAndSet(true, false);
        group.shutdownGracefully();
    }

    private class ClientInitHandler extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("ClientWriteHandler", new ClientWriteHandler());
            pipeline.addLast("ClientReadHandler", new ClientReadHandler());
        }
    }


    private class ClientWriteHandler extends MessageToMessageEncoder<RpcRequest> {

        @Override
        protected void encode(ChannelHandlerContext ctx, RpcRequest msg, List<Object> out) throws Exception {
            LOGGER.info("NettyClient write事件触发");
            byte[] bytes = HessianEncoder.INSTANCE.encode(msg);
            ByteBuf buf = Unpooled.buffer(bytes.length);
            buf.writeBytes(bytes);
            out.add(buf);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error("ClientWriteHandler发生异常", cause);
            ctx.close();
        }
    }

    private class ClientReadHandler extends ChannelInboundHandlerAdapter {

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
            buf.readBytes(req);
            // 释放资源
            buf.release();
            RpcResponse rpcResponse = (RpcResponse) HessianDecoder.INSTANCE.decode(req);
            LOGGER.debug("NettyClient read事件触发，返回结果:{}", rpcResponse);
            rpcResponseMap.get(rpcResponse.getRequestId()).offer(rpcResponse);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            LOGGER.error("ClientReadHandler发生异常", cause);
            ctx.close();
        }
    }

    public static void main(String[] args) throws InterruptedException, IOException {
        NettyClient client = new NettyClient();
        long time1 = System.currentTimeMillis();
        for (int i = 0; i < 100000; i++) {
            RpcRequest rpcRequest = new RpcRequest();
            rpcRequest.setServiceName(i + "");
            client.sendRpcRequest(rpcRequest);
        }
        long time2 = System.currentTimeMillis();
        System.out.println(time2 - time1);
        System.out.println("TPS/S:" + 100000 / ((time2 - time1) / 1000) );
    }

}